;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements.  See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership.  The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License.  You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns org.apache.storm.clojure
  (:use [org.apache.storm util])
  (:import [org.apache.storm StormSubmitter])
  (:import [org.apache.storm.generated StreamInfo])
  (:import [org.apache.storm.tuple Tuple])
  (:import [org.apache.storm.task OutputCollector IBolt TopologyContext])
  (:import [org.apache.storm.spout SpoutOutputCollector ISpout])
  (:import [org.apache.storm.utils Utils])
  (:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
  (:import [java.util Collection List])
  (:require [org.apache.storm [thrift :as thrift]]))

(defn direct-stream [fields]
  (StreamInfo. fields true))

(defn to-spec [avar]
  (let [m (meta avar)]
    [(str (:ns m)) (str (:name m))]))

(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
  (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec)))

(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
  `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))

(defn clojure-spout* [output-spec fn-var conf-var args]
  (let [m (meta fn-var)]
    (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec))
    ))

(defmacro clojure-spout [output-spec fn-sym conf-sym args]
  `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))

(defn normalize-fns [body]
  (for [[name args & impl] body
        :let [args (-> "this"
                       gensym
                       (cons args)
                       vec)]]
    (concat [name args] impl)
    ))

(defmacro bolt [& body]
  (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
        fns (normalize-fns bolt-fns)]
    `(reify IBolt
       ~@fns
       ~@other-fns)))

(defmacro bolt-execute [& body]
  `(bolt
     (~'execute ~@body)))

(defmacro spout [& body]
  (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
        fns (normalize-fns spout-fns)]
    `(reify ISpout
       ~@fns
       ~@other-fns)))

(defmacro defbolt [name output-spec & [opts & impl :as all]]
  (if-not (map? opts)
    `(defbolt ~name ~output-spec {} ~@all)
    (let [worker-name (symbol (str name "__"))
          conf-fn-name (symbol (str name "__conf__"))
          params (:params opts)
          conf-code (:conf opts)
          fn-body (if (:prepare opts)
                    (cons 'fn impl)
                    (let [[args & impl-body] impl
                          coll-sym (nth args 1)
                          args (vec (take 1 args))
                          prepargs [(gensym "conf") (gensym "context") coll-sym]]
                      `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
          definer (if params
                    `(defn ~name [& args#]
                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#))
                    `(def ~name
                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name []))
                    )
          ]
      `(do
         (defn ~conf-fn-name ~(if params params [])
           ~conf-code
           )
         (defn ~worker-name ~(if params params [])
           ~fn-body
           )
         ~definer
         ))))

(defmacro defspout [name output-spec & [opts & impl :as all]]
  (if-not (map? opts)
    `(defspout ~name ~output-spec {} ~@all)
    (let [worker-name (symbol (str name "__"))
          conf-fn-name (symbol (str name "__conf__"))
          params (:params opts)
          conf-code (:conf opts)
          prepare? (:prepare opts)
          prepare? (if (nil? prepare?) true prepare?)
          fn-body (if prepare?
                    (cons 'fn impl)
                    (let [[args & impl-body] impl
                          coll-sym (first args)
                          prepargs [(gensym "conf") (gensym "context") coll-sym]]
                      `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
          definer (if params
                    `(defn ~name [& args#]
                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name args#))
                    `(def ~name
                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name []))
                    )
          ]
      `(do
         (defn ~conf-fn-name ~(if params params [])
           ~conf-code
           )
         (defn ~worker-name ~(if params params [])
           ~fn-body
           )
         ~definer
         ))))

(defprotocol TupleValues
  (tuple-values [values collector stream]))

(extend-protocol TupleValues
  java.util.Map
  (tuple-values [this collector ^String stream]
    (let [^TopologyContext context (:context collector)
          fields (..  context (getThisOutputFields stream) toList) ]
      (vec (map (into
                  (empty this) (for [[k v] this]
                                   [(if (keyword? k) (name k) k) v]))
                fields))))
  java.util.List
  (tuple-values [this collector stream]
    this))

(defn- collectify
  [obj]
  (if (or (sequential? obj) (instance? Collection obj))
    obj
    [obj]))

(defnk emit-bolt! [collector values
                   :stream Utils/DEFAULT_STREAM_ID :anchor []]
  (let [^List anchor (collectify anchor)
        values (tuple-values values collector stream) ]
    (.emit ^OutputCollector (:output-collector collector) stream anchor values)
    ))

(defnk emit-direct-bolt! [collector task values
                          :stream Utils/DEFAULT_STREAM_ID :anchor []]
  (let [^List anchor (collectify anchor)
        values (tuple-values values collector stream) ]
    (.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values)
    ))

(defn ack! [collector ^Tuple tuple]
  (.ack ^OutputCollector (:output-collector collector) tuple))

(defn fail! [collector ^Tuple tuple]
  (.fail ^OutputCollector (:output-collector collector) tuple))

(defn reset-timeout! [collector ^Tuple tuple]
  (.resetTimeout ^OutputCollector (:output-collector collector) tuple))

(defn report-error! [collector ^Tuple tuple]
  (.reportError ^OutputCollector (:output-collector collector) tuple))

(defnk emit-spout! [collector values
                    :stream Utils/DEFAULT_STREAM_ID :id nil]
  (let [values (tuple-values values collector stream)]
    (.emit ^SpoutOutputCollector (:output-collector collector) stream values id)))

(defnk emit-direct-spout! [collector task values
                           :stream Utils/DEFAULT_STREAM_ID :id nil]
  (let [values (tuple-values values collector stream)]
    (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))

(defmacro defalias
  "Defines an alias for a var: a new var with the same root binding (if
  any) and similar metadata. The metadata of the alias is its initial
  metadata (as provided by def) merged into the metadata of the original."
  ([name orig]
   `(do
      (alter-meta!
        (if (.hasRoot (var ~orig))
          (def ~name (.getRawRoot (var ~orig)))
          (def ~name))
        ;; When copying metadata, disregard {:macro false}.
        ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
        #(conj (dissoc % :macro)
               (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
      (var ~name)))
  ([name orig doc]
   (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))

(defalias topology thrift/mk-topology)
(defalias bolt-spec thrift/mk-bolt-spec)
(defalias spout-spec thrift/mk-spout-spec)
(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
(defalias shell-spout-spec thrift/mk-shell-spout-spec)

(defn submit-remote-topology [name conf topology]
  (StormSubmitter/submitTopology name conf topology))

(defn local-cluster []
  ;; do this to avoid a cyclic dependency of
  ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
  (eval '(new org.apache.storm.LocalCluster)))
